/*
 * Copyright 2018- The Pixie Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */

syntax = "proto3";

package px.carnot.planpb;

option go_package = "planpb";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/wrappers.proto";
import "src/api/proto/uuidpb/uuid.proto";
import "src/shared/types/typespb/types.proto";

message PlanOptions {
  // Show the execution plan for the given query without executing the query.
  bool explain = 2;
  // Execute the query and show the execution plan for the query, along with
  // query execution metrics.
  bool analyze = 3;
  // Max number of output rows per table.
  // This limit applies to the entire result for batch tables, and per window on windowed
  // streaming queries.
  int64 max_output_rows_per_table = 4;
  // Reserved for prior fields (distributed).
  reserved 1;
}

// The plan is composed of multiple fragments which each
// define a part of the plan that needs to be executed sequentially
// in order to resolve dependencies (ex. Blocking Join).
message Plan {
  // The DAG which stores the connection information of the plan fragments.
  DAG dag = 1;
  // List of plan fragments referenced by the DAG.
  repeated PlanFragment nodes = 2;
  // Options about how the plan should be executed.
  PlanOptions plan_options = 4;
  // The incoming connections to this node.
  repeated uuidpb.UUID incoming_agent_ids = 5 [ (gogoproto.customname) = "IncomingAgentIDs" ];
  // The GRPC Addresses for servers that track execution statuses such as init conn, stats, and
  // errors generated by this plan during execution.
  message ExecutionStatusDestination {
    string grpc_address = 1;
    string ssl_targetname = 2;
  }
  repeated ExecutionStatusDestination execution_status_destinations = 6;
  reserved 3;
}

// Each plan fragment is a DAG represented in a format similar to an adjacency
// list: a list of nodes and interconnections between nodes.
message PlanFragment {
  // The ID of the plan fragment.
  uint64 id = 1;
  // The DAG which stores the connection information.
  DAG dag = 2;
  // List of nodes referenced by the DAG.
  repeated PlanNode nodes = 3;
}

// The DAG simply stores the dependencies for each node.
message DAG {
  // DAG node stores information about each node and it's dependencies.
  message DAGNode {
    // The ID of the node. This is the same as the ID used for PlanNode or PlanFragment.
    uint64 id = 1;
    // The sorted parents of this DAG node.
    repeated uint64 sorted_parents = 4;
    // The sorted children of this DAG node.
    repeated uint64 sorted_children = 3;
  }
  // List of dependencies.
  repeated DAGNode nodes = 1;
}

// A execution node used by the query plan.
message PlanNode {
  // A unique ID used to reference this node.
  uint64 id = 1;
  // The operation that this node performs.
  Operator op = 2;
}

enum OperatorType {
  OPERATOR_TYPE_UNKNOWN = 0;
  // Source operators are rangs 1000 - 2000.
  MEMORY_SOURCE_OPERATOR = 1000;
  GRPC_SOURCE_OPERATOR = 1100;
  UDTF_SOURCE_OPERATOR = 1200;
  EMPTY_SOURCE_OPERATOR = 1300;
  // Regular operators are range 2000 - 10000.
  MAP_OPERATOR = 2000;
  AGGREGATE_OPERATOR = 2100;
  FILTER_OPERATOR = 2200;
  LIMIT_OPERATOR = 2300;
  UNION_OPERATOR = 2400;
  JOIN_OPERATOR = 2500;
  // Sink operators are range 9000-10000.
  MEMORY_SINK_OPERATOR = 9000;
  GRPC_SINK_OPERATOR = 9100;
  OTEL_EXPORT_SINK_OPERATOR = 9200;
}

// The Logical operation performed. Each operator needs and entry in this
// message.
message Operator {
  OperatorType op_type = 1;
  oneof op {
    // Fetches data from in memory storage.
    MemorySourceOperator mem_source_op = 2;
    // Performs a simple Map operation.
    MapOperator map_op = 3;
    // Performs a windowed or blocking aggregate.
    AggregateOperator agg_op = 4;
    // Operator that stores data to memory.
    MemorySinkOperator mem_sink_op = 5;
    // Performs a compacting Filter operation.
    FilterOperator filter_op = 6;
    // Operator that performs a limit.
    LimitOperator limit_op = 7;
    // Operator that performs an ordered union across multiple inputs.
    UnionOperator union_op = 8;
    // Operator that reads data from a remote node via grpc.
    GRPCSourceOperator grpc_source_op = 9 [ (gogoproto.customname) = "GRPCSourceOp" ];
    // Operator that stores data to a remote node via grpc.
    GRPCSinkOperator grpc_sink_op = 1000 [ (gogoproto.customname) = "GRPCSinkOp" ];
    // Operator that performs a join based on column equality and/or time_ columns within a certain
    // range of each other.
    JoinOperator join_op = 11;
    // Operator that represents the UDTF source.
    UDTFSourceOperator udtf_source_op = 12;
    // EmptySourceOperator represents an operator that outputs empty rowbatches.
    EmptySourceOperator empty_source_op = 13;
    // OTelExportSinkOperator writes the input table to an OpenTelemetry endpoint.
    OTelExportSinkOperator otel_sink_op = 14 [ (gogoproto.customname) = "OTelSinkOp" ];
  }
}

// Fetches data from in-memory source.
message MemorySourceOperator {
  // Name of the table.
  string name = 1;
  // The column indexes to fetch.
  repeated int64 column_idxs = 2;
  // The names for the columns.
  repeated string column_names = 3;
  // The types of the columns.
  repeated px.types.DataType column_types = 4;
  // The start time (can be omitted if not time series).
  google.protobuf.Int64Value start_time = 5;
  // The stop time (can be omitted if not time series).
  google.protobuf.Int64Value stop_time = 6;
  // Optional tabletization value that is passed in if the source is tabletized.
  string tablet = 7;
  // Whether or not the MemorySource should return results
  // in the future (i.e. results not yet in the table)
  bool streaming = 8;
}

// Writes to in-memory storage.
message MemorySinkOperator {
  // Storage is organized as tables, this specifies the name of the table.
  string name = 1;
  // The types of the columns.
  repeated px.types.DataType column_types = 2;
  // The names of the columns.
  repeated string column_names = 3;
  // The semantic types of the columns.
  repeated px.types.SemanticType column_semantic_types = 4;
}

// Reads from a GRPC service that other machines send RowBatches to.
message GRPCSourceOperator {
  // The types of the columns that the source generates.
  repeated px.types.DataType column_types = 1;
  // The names of the columns that the source generates.
  repeated string column_names = 2;
}

// Writes to a remote machine via GRPC.
message GRPCSinkOperator {
  // The address of the GRPC service.
  string address = 1;
  // Originally used for `destination_id` field, now renamed to `grpc_source_id`.
  reserved 2;
  // This message is used when the GRPCSink produces a final result table, as opposed to an
  // internal result that is ingested by a corresponding GRPCSource.
  message ResultTable {
    // The name of the output table.
    string table_name = 1;
    // The following fields are used when destination is set to `table_name`.
    // The types of the columns.
    repeated px.types.DataType column_types = 2;
    // The names of the columns.
    repeated string column_names = 3;
    // The semantic types of the columns.
    repeated px.types.SemanticType column_semantic_types = 4;
  }
  // GRPCSinkOperator refers to its corresponding GRPCSourceOperator to each other via its DAG ID.
  oneof destination {
    // The ID of the GRPC Source node that will receive the RowBatch, when the sink's RowBatches
    // are being sent to another Carnot instance.
    uint64 grpc_source_id = 3 [ (gogoproto.customname) = "GRPCSourceID" ];
    // The name of the table that row batches from this sink belong to, when the sink's RowBatches
    // are being sent to a non-Carnot address, such as the query broker.
    ResultTable output_table = 4;
  }
  // Options regarding the GRPC connection to be established.
  message GRPCConnectionOptions {
    // This field is used when there is a need for an SSL target hostname override.
    string ssl_targetname = 1;
  }
  GRPCConnectionOptions connection_options = 5;
}

// Performs map operation.
message MapOperator {
  // Each scalar operator defines an output column.
  // In order for columns to be copied they must be references by
  // the scalar expression.
  repeated ScalarExpression expressions = 1;
  repeated string column_names = 2;
}

// Aggregate does a group by the "group" Columns and computes
// the aggregates based on aggregate expression. Number of
// output columns is equal to the len(values).
message AggregateOperator {
  // The functions to execute for this aggregate.
  // Note that only agg funcs are valid here.
  repeated AggregateExpression values = 1;
  // The columns to use for grouping. For grouping involving a function
  // a map operation will need to be performed first.
  repeated Column groups = 2;
  // The names of the output groups.
  repeated string group_names = 3;
  // The names of values.
  repeated string value_names = 4;
  // Whether to do a windowed (streaming) aggregate or a blocking aggregate.
  bool windowed = 5;
  // The following two flags are used to support partial aggs.
  // 1. partial_agg -> perform a partial aggregate.
  // 2. finalize_results -> merge partial aggregate results.
  // 3. partial_agg && finalize_results -> do a single full aggregate.
  // Whether this aggregate partially aggregates the input.
  bool partial_agg = 6;
  // Whether this merges the results of partial aggregates.
  bool finalize_results = 7;
}

// Performs a compacting filter
message FilterOperator {
  // The scalar expression defines the filtering expression that will be run on the data.
  ScalarExpression expression = 1;
  // Defines the columns that are passed from the previous operator.
  repeated Column columns = 2;
}

// Limit performs a limit on the results of the previous operation.
message LimitOperator {
  int64 limit = 1;
  // Defines the columns that are passed from the previous operator.
  repeated Column columns = 2;
  // List of node_ids corresponding to Memory/GRPC Sources that can be aborted
  // after this limit has processed all its rows.
  repeated uint64 abortable_srcs = 3;
}

// Union merges multiple inputs into a single output result.
// It supports reordering of columns across the inputs.
// Input relations [a:int, b:str],[b:str, a:int] would produce [a:int, b:str].
// Columns with the same name must share the same type.
// Union preserves the time_ order of the output column named time_.
message UnionOperator {
  // Each ColumnMapping corresponds to a single input relation.
  // It tells the Union operator how to transform the relation to the expected result.
  message ColumnMapping {
    // The mapping instructions, one for each input stream.
    // column_indexes=[4] would signify that input column 4 becomes output column 0.
    repeated int64 column_indexes = 1;
  }
  // Output column names
  repeated string column_names = 1;
  repeated ColumnMapping column_mappings = 2;
  uint64 rows_per_batch = 3;
}

// Join performs an equijoin join across two input tables.
// The first table is the left table, the second table is the right table.
// It specifies the output columns as well as the join type.
// It preserves order across the output time_ column, if there is one.
message JoinOperator {
  // The Join method for this operator.
  // INNER: only return rows that join both left and right.
  // LEFT_OUTER: return all left rows and those right rows that join to a left row.
  // FULL_OUTER: return all left rows and right rows, joining those that match
  // the equality condition.
  enum JoinType {
    INNER = 0;
    // Right outer joins should be mapped into left outer joins by the compiler.
    LEFT_OUTER = 1;
    FULL_OUTER = 3;
  }
  // Equality condition references two columns that are compared for equality
  // during the Join process. Left and right are joined when these column
  // values are equal.
  message EqualityCondition {
    uint64 left_column_index = 1;
    uint64 right_column_index = 2;
  }

  message ParentColumn {
    // parent_index indicates which parent table this column comes from.
    // So 0 means it comes from the left table, 1 means it comes from the right table.
    uint64 parent_index = 1;
    uint64 column_index = 2;
  }

  JoinType type = 1;
  // The conditions for the equijoin, which are ANDed together.
  repeated EqualityCondition equality_conditions = 2;
  repeated ParentColumn output_columns = 3;
  // Names of the output columns.
  repeated string column_names = 4;
  // Number of rows we send over per output batch.
  uint64 rows_per_batch = 5;
}

// UDTFSourceOperator represents a table generating function.
message UDTFSourceOperator {
  // The name of the UDTF.
  string name = 1;
  // The values to assign the arguments. Must be the same length as arg_name.
  repeated ScalarValue arg_values = 2;
}

// Source operator that returns an empty batch.
message EmptySourceOperator {
  // The names for the columns.
  repeated string column_names = 1;
  // The types of the columns.
  repeated px.types.DataType column_types = 2;
}

// OTelSpan maps operator columns to each field in the OpenTelemetry Span configuration.
// The mapping ensures that each row of the table will be a separate span.
// Maps to the config described here:
// https://github.com/open-telemetry/opentelemetry-proto/blob/0e254b5c04614244edb35a9d4bf73ca26b44438a/opentelemetry/proto/trace/v1/trace.proto#L84
message OTelSpan {
  // A name can either be shared across all Spans or be set by a value in a column.
  // name_string sets for all, name_column sets the Span name based on the column value.
  oneof name {
    string name_string = 1;
    int64 name_column_index = 2;
  }
  // A mapping of the OpenTelemetry attribute name to the data that populates that attribute.
  repeated OTelAttribute attributes = 3;

  // (Optional) The column index containing trace id.
  // Data must be exactly 16 bytes in length.
  // If the index < 0, the operator will auto-generate values for this field. If the index > 0 and
  // the value is empty, the operator will auto-generate the value for this field.
  int64 trace_id_column_index = 4 [ (gogoproto.customname) = "TraceIDColumn" ];
  // (Optional) The column containing the span id.
  // Data must be exactly 8 bytes in length.
  // If the index < 0, the operator will auto-generate values for this field. If the index > 0 and
  // the value is empty, the operator will auto-generate the value for this field.
  int64 span_id_column_index = 5 [ (gogoproto.customname) = "SpanIDColumn" ];
  // (Optional) The column of the parent span id.
  // Data must be exactly 8 bytes in length.
  // If the index < 0, the operator will keep the field as empty. If the index > 0 and the value
  // is empty, the operator will not specify the OpenTelemetry field.
  int64 parent_span_id_column_index = 6 [ (gogoproto.customname) = "ParentSpanIDColumn" ];
  //  start_time_column points to the column that contains the start
  // time of the aggregation. Column must be of type TIME64NS.
  int64 start_time_column_index = 7;
  // end_time_column points to the column that contains the end time of the
  // aggregation. Column must be of type TIME64NS.
  int64 end_time_column_index = 8;

  // kind_value is the raw integer value for the
  // ::opentelemetry::proto::trace::v1::Span::SpanKind. If you use a kind_value that's not a
  // valid enum value, the value will be considered undefined.
  int64 kind_value = 9;
}

// OTelMetricGauge maps operator columns to the fields of the OpenTelemetry Gauge metric.
// configurations.
message OTelMetricGauge {
  // The column that contains the Gauge value in each row.
  // Must be either a FLOAT64 or an INT64
  oneof value_column {
    int64 float_column_index = 1;
    int64 int_column_index = 2;
  }
}

// OTelMetricSummary maps operator columns to the fields of OpenTelemetry summary metric.
// Allows users to specify statistics about the distribution, including the quantile
// values.
message OTelMetricSummary {
  // The column that contains the number of values in the population. The values in the column
  // must be non-negative.
  int64 count_column_index = 1;
  // (Optional) The column that contains the sum of values in the population. If the index is < 0,
  // the column will be considered unspecified.
  int64 sum_column_index = 2;

  // The mapping of a quantile value to column that contains the value.
  // Some examples:
  // p50 would have a quantile value of 0.5
  // p99 would have a quantile value of 0.99
  // max value (p100) would be quantile 1.0
  // min value (p0) would be quantile 0.0
  message ValueAtQuantile {
    // The quantile of the distirbution, must be [0.0, 1.0].
    double quantile = 1;
    // The column containing quantile info. The value column must be a FLOAT64
    // and values may not be negative.
    int64 value_column_index = 2;
  }
  repeated ValueAtQuantile quantile_values = 3;
}

message OTelAttribute {
  // The name of the attribute.
  string name = 1;
  message Column {
    // The type of the attribute column.
    px.types.DataType column_type = 1;
    // The index of the atribute column.
    int64 column_index = 2;
    // Whether the column is optionally JSON encoded.
    // That means the column might be an element not encoded as json
    // or a json array of multiple values.
    bool can_be_json_encoded_array = 3;
  }
  // The source of the attribute value.
  oneof value {
    // The column that contains the attribute value
    Column column = 2;
    // The actual string value to store in the attribute.
    string string_value = 3;
  }
}

// OTelMetric describes the column mapping to each field of an OpenTelemetry Metric.
// Maps to the OpenTelemetry config described here:
// https://github.com/open-telemetry/opentelemetry-proto/blob/0e254b5c04614244edb35a9d4bf73ca26b44438a/opentelemetry/proto/metrics/v1/metrics.proto#L160
message OTelMetric {
  string name = 1;
  string description = 2;
  // unit which the metric value is reported. Follows the format described by
  // http://unitsofmeasure.org/ucum.html.
  string unit = 3;
  // A mapping of the OpenTelemetry attribute name to the data that populates that attribute.
  repeated OTelAttribute attributes = 4;

  // The column that contains the end time of the aggregation.
  // Must be TIME64NS.
  int64 time_column_index = 5;

  oneof data {
    OTelMetricGauge gauge = 101;
    OTelMetricSummary summary = 102;
  }
}

// OTelEndpointConfig contains the connection parameters for an OpenTelemetry
// Exporter.
message OTelEndpointConfig {
  // url is the address of the OpenTelemetry collector.
  string url = 1 [ (gogoproto.customname) = "URL" ];
  // The headers that should be attached to the connection context, such as
  // authentication credentials or api keys.
  map<string, string> headers = 2;
  // True if the OpenTelemetry collector is not protected with SSL.
  bool insecure = 3;
  // The number of seconds before a request to the OpenTelemetry collector times out.
  int64 timeout = 4;
}

// Defines a resource. Discussed in depth in the OpenTelemetry spec.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/sdk.md
message OTelResource {
  repeated OTelAttribute attributes = 1;
}

// OTelExportSinkOperator defines an operator that exports the given table to OpenTelemetry.
// The operator's job is to map columns to specific fields of the OpenTelemetry
// interface. The operator implementation will then write each row of the tables
// into the OpenTelemetry format.
message OTelExportSinkOperator {
  // EndpointConfig is the connection parameters to the OpenTelemetry collector.
  OTelEndpointConfig endpoint_config = 1;
  // Resource describes where the telemetry data comes from.
  OTelResource resource = 2;
  // Metrics describest the exported metrics for this resource.
  repeated OTelMetric metrics = 3;
  // Metrics describes the exported spans for this resource.
  repeated OTelSpan spans = 4;
}

// Scalar expression is any single valued expression.
message ScalarExpression {
  oneof value {
    // A constant value.
    ScalarValue constant = 1;
    // A value that is a column reference.
    Column column = 2;
    // A value that is a function applied on a set of columns.
    ScalarFunc func = 3;
  }
}

// ScalarValues reference a single constant value.
message ScalarValue {
  // We need to store the type to handle the null case and make sure we have the
  // "correct" null value. This type takes precedence on the one of value below.
  // If they mismatch a null value will be used.
  px.types.DataType data_type = 1;
  oneof value {
    bool bool_value = 2;
    int64 int64_value = 3;
    double float64_value = 4;
    string string_value = 5;
    int64 time64_ns_value = 6;
    px.types.UInt128 uint128_value = 7;
  }
  reserved 8;
}

// ScalarFuncs perform an operator on a set of arguments and produce a single
// valued output.
// TODO(zasgar). What do to about multi value output?
// Should they produce a list that is flattened, or a struct?
message ScalarFunc {
  // The name of the function. Should exist in UDF/UDAF registry.
  string name = 1;
  // The constructor arguments.
  repeated ScalarValue init_args = 2;
  // The arguments used during evaluation.
  repeated ScalarExpression args = 3;
  int64 id = 4;
  // The datatypes of the args.
  repeated px.types.DataType args_data_types = 5;
}

// AggregateExpressions perform an operator on a set of arguments and produce a single
// valued output.
message AggregateExpression {
  message Arg {
    oneof value {
      // A constant value.
      ScalarValue constant = 1;
      // A value that is a column reference.
      Column column = 2;
    }
  }
  // The name of the function. Should exist in UDA registry.
  string name = 3;
  // The constructor arguments.
  repeated ScalarValue init_args = 4;
  repeated Arg args = 5;
  int64 id = 6;
  // The datatypes of the args.
  repeated px.types.DataType args_data_types = 7;
}

// A column reference.
message Column {
  // The node that produces the output.
  uint64 node = 1;
  // The index of the column.
  uint64 index = 2;
}
